932ecd9a8bc3feade0b7c0712dac6ed393231b95,test/src/test/java/org/corfudb/runtime/object/transactions/OptimisticTXConcurrencyTest.java,OptimisticTXConcurrencyTest,testOptimism,#,171

Before Change


        assertThat(numTasks).isGreaterThan(1); // don't change concurrency to less than 2, test will break

        // a state-machine:
        ArrayList<BiConsumer<Integer, Integer>> stateMachine = new ArrayList<BiConsumer<Integer, Integer>>();

        // SM step 1: start an optimistic transaction
        stateMachine.add((Integer ignored_thread_num, Integer ignored_task_num) -> {
            TXBegin();
        });

        // SM step 2: task k modify counter k
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            sharedCounters.get(task_num).setValue(OVERWRITE_ONCE);
        });

        // SM step 3: task k reads counter k+1
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
                    .isBetween(INITIAL, OVERWRITE_ONCE);
        });

        // SM step 4: task k verifies opacity, checking that it can read its own modified value of counter k
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            assertThat(sharedCounters.get(task_num).getValue())
                    .isEqualTo(OVERWRITE_ONCE);
        });

        // SM step 5: task k overwrites counter k+1
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            sharedCounters.get((task_num+1)%numTasks).setValue(OVERWRITE_TWICE);
        } );

        // SM step 6: task k again check opacity, reading its own modified value, this time of counter k+1
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
                    .isEqualTo(OVERWRITE_TWICE);
        });

        // SM step 7: each thread again verifies opacity, checking that it can re-read counter k
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            assertThat(sharedCounters.get(task_num).getValue())
                    .isEqualTo(OVERWRITE_ONCE);
        });

        // SM step 8: try to commit all transactions;
        // task k aborts only if one or both of (k-1), (k+1) committed before it
        stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
            try {
                TXEnd();
                commitStatus.set(task_num, COMMITVALUE);
            } catch (TransactionAbortedException tae) {
                assertThat(commitStatus.get((task_num + 1) % numTasks) == COMMITVALUE ||
                                commitStatus.get((task_num - 1) % numTasks) == COMMITVALUE)
                        .isTrue();
            }
        } );

        // invoke the interleaving engine
        scheduleInterleaved(PARAMETERS.CONCURRENCY_SOME, numTasks, stateMachine);
    }

    void TXEnd() {

After Change


        });

        // SM step 2: task k modify counter k
        addTestStep((task_num) -> {
            sharedCounters.get(task_num).setValue(OVERWRITE_ONCE);
        });

        // SM step 3: task k reads counter k+1
        addTestStep((task_num) -> {
            assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
                    .isBetween(INITIAL, OVERWRITE_ONCE);
        });

        // SM step 4: task k verifies opacity, checking that it can read its own modified value of counter k
        addTestStep((task_num) -> {
            assertThat(sharedCounters.get(task_num).getValue())
                    .isEqualTo(OVERWRITE_ONCE);
        });

        // SM step 5: task k overwrites counter k+1
        addTestStep((task_num) -> {
            sharedCounters.get((task_num + 1) % numTasks).setValue(OVERWRITE_TWICE);
        });

        // SM step 6: task k again check opacity, reading its own modified value, this time of counter k+1
        addTestStep((task_num) -> {
            assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
                    .isEqualTo(OVERWRITE_TWICE);
        });

        // SM step 7: each thread again verifies opacity, checking that it can re-read counter k
        addTestStep((task_num) -> {
            assertThat(sharedCounters.get(task_num).getValue())
                    .isEqualTo(OVERWRITE_ONCE);
        });

        // SM step 8: try to commit all transactions;
        // task k aborts only if one or both of (k-1), (k+1) committed before it
        addTestStep((task_num) -> {
            try {
                TXEnd();
                commitStatus.set(task_num, COMMITVALUE);
            } catch (TransactionAbortedException tae) {
                // do nothing
            }
        });

        // invoke the execution engine
        if (testInterleaved)
            scheduleInterleaved(PARAMETERS.CONCURRENCY_SOME, numTasks);
        else
            scheduleThreaded(PARAMETERS.CONCURRENCY_SOME, numTasks);

        // verfiy that all aborts are justified
        for (int task_num = 0; task_num < numTasks; task_num++) {